package com.bodystm.server;

import java.util.ArrayList;
import java.util.List;

import org.apache.commons.lang.ArrayUtils;
import org.apache.log4j.Logger;

import com.bodystm.algorithm.DllInterface;
import com.bodystm.algorithm.IirFilter;
import com.bodystm.algorithm.LowPerf;
import com.bodystm.algorithm.MathTools;
import com.bodystm.algorithm.MedFilter;
import com.bodystm.algorithm.MedfilterBO;
import com.bodystm.bean.Bed;
import com.bodystm.bean.Ecg;
import com.bodystm.bean.Oximetry;
import com.bodystm.bean.datadeal.BodystmProduct;
import com.bodystm.bean.datadeal.BreathRateAnalysis;
import com.bodystm.bean.datadeal.EcgAnalysis;
import com.bodystm.bean.datadeal.OximetryAnalysis;
import com.bodystm.config.PublicSetting;
import com.bodystm.util.MathUtils;
import com.bodystm.util.SerializeUtil;
import com.bodystm.web.DeviceSettings;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import redis.clients.jedis.Jedis;
/**
 * 消费者使用ScheduleMQ接收数据
 * ScheduleMQ mq = new ScheduleMQ();  //也就是consumer
       mq.start();
 * @author ehl
 *
 */
public class ScheduleMQ4Oximetry extends Thread {
	Logger logger=Logger.getLogger(ScheduleMQ4Oximetry.class);
	private byte[] mac;
	private String strMac;
	private byte[] rediskey;
	public Oximetry oximetry=null;
	private OximetryAnalysis oximetryAnalysis=new OximetryAnalysis();
	/*private MathTools mathTools=new MathTools();
	private MedfilterBO medfilterBO4Irfrared=new MedfilterBO();//中值滤波
	private MedfilterBO medfilterBO4Red=new MedfilterBO();//中值滤波
	private IirFilter iirFilter4Irfrared=new IirFilter();//低通滤波
	private IirFilter iirFilter4Red=new IirFilter();//低通滤波
	private LowPerf lowPerf=new LowPerf();//lowPerf4Irfrared
*/	//private LowPerf lowPerf4Red=new LowPerf();
	//===============算法改为调用dll的形式=====================
//	private MathTools mathTools=new MathTools();
	private long medfilterBO4Irfrared=DllInterface.INSTANCE.MedFilterBO_C();
	private long medfilterBO4Red=DllInterface.INSTANCE.MedFilterBO_C();//中值滤波
	private long iirFilter4Irfrared=DllInterface.INSTANCE.IIFilter_C();//低通滤波
	private long iirFilter4Red=DllInterface.INSTANCE.IIFilter_C();//低通滤波
	private LowPerf lowPerf=new LowPerf();
	private long lowPerf_dll=DllInterface.INSTANCE.butter_C();//低通滤波
	private long SSF_Calc=DllInterface.INSTANCE.SSF_Calc_C();
	private long mathTools=DllInterface.INSTANCE.Resampling_C();
	private long spO2Algorithm_C=DllInterface.INSTANCE.SpO2Algorithm_C();
	//===================================================
	private Bed bed=null;
	public ScheduleMQ4Oximetry(byte[] data){
		this.mac=ArrayUtils.subarray(data,  0, 6);
		this.strMac=ByteUtil.convertByteToString(mac);
		this.rediskey=data;
	}
	@Override
	public void run(){
		//==========================
		//建立websocket服务，等待客户端连接
		
		//==========================
		Jedis jedis=null;
		///2018年3月2日17:34:19
		/*for (Bed bed : DeviceSettings.lstBeds) {
			if (!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)&&bed.getConcentratorNo().equals(strMac)) {
				oximetry=bed.getOximetry();
				break;
			}
		}*/
		try {
			jedis = RedisManager.getJedis();
			//组装发送到前台的数据
			StringBuilder sbResultData=null;
			while(!isInterrupted()) {
			    if (jedis==null) {
			    	logger.error("redis服务连接失败！");
					break;
				}
			    oximetry=null;//为了防止后台修改床位mac时，哈希表中bed变为新的，这里还在更新旧的bed对象
			    if (oximetry==null) {
			    	bed = DeviceSettings.hasBeds.get(strMac);
			    	if (null!=bed&&!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)) {
			    		oximetry=bed.getOximetry();
			    		if (oximetry==null) {
			    			oximetry=new Oximetry();
			    			bed.setOximetry(oximetry);
			    		}
			    	}			    	
			    }
			    //阻塞式brpop，List中无数据时阻塞  
			    //参数0表示一直阻塞下去，直到List出现数据  
			    List<byte[]> list = jedis.brpop(0, rediskey);  
			    
			    
			    /*if (oximetry==null) {
			    	for (Bed bed : DeviceSettings.lstBeds) {
						if (!bed.getStatus().equals(PublicSetting.BED_STATUS_EMPTY)&&bed.getConcentratorNo().equals(strMac)) {
							oximetry=bed.getOximetry();
							if (oximetry==null) {
								oximetry=new Oximetry();
								bed.setOximetry(oximetry);
							}
							break;
						}
					}
//					continue;
				}*/
			    Channel channel =null;
			    ArrayList<Channel> channels=null;
			    //WebSocketManager.wsServer.sendMessage(strMac, strMac);
			    if (WebSocketManager.wsServer!=null) {
//			    	channel = WebSocketManager.wsServer.channelsMap4Oximetry.get(strMac);
			    	channels = WebSocketManager.wsServer.channelsMap4Oximetry2.get(strMac);
				}
//		        if (channel!=null&&channel.isOpen()) {//即使没有客户端通过ws连接也要用算法进行计算
		        	for(byte[] bs : list) {  
		        		//忽略返回的列表中的key值
		        		if (bs.length<10) {
							continue;
						}
		        		//每包数据计算完重新完重新初始化相关变量
		        		/*medfilterBO4Irfrared.nChangeBPMFalg=0;
		        		medfilterBO4Irfrared.curIndex4dIred=0;*/
		        		//每包数据计算完重新完重新初始化相关变量 END
		        		sbResultData=new StringBuilder();
		        		sbResultData.append("/"+strMac+"/Irfrared/");
		        		for(int i=0;i<OximetryAnalysis.OXI_POINTS_PER_PACK;i++){
		        			//IrfraredLight
		        			double value=Double.parseDouble(MathUtils.binary(ArrayUtils.subarray(bs, 8+6*i, 11+6*i), 10));
		        			//buffer算法
					        lowPerf.m_dIrf_AC[lowPerf.curNum]=(float)value;
					        double tmp=DllInterface.INSTANCE.butter_B(value, 0, lowPerf_dll);
					        tmp=DllInterface.INSTANCE.MedFilterBO_B(tmp, 0, medfilterBO4Irfrared);
					        value=tmp;
					        
		        			//RedLight
		        			double value4Red=Double.parseDouble(MathUtils.binary(ArrayUtils.subarray(bs, 11+6*i, 14+6*i), 10));
		        			//统计极值用来绘制波形
		        			oximetryAnalysis.extremeStastics(tmp);
		        			//m_dRed_AC
		        			lowPerf.m_dRed_AC[lowPerf.curNum]=(float)value4Red;
		        			lowPerf.curNum++;
		        			//Calc SpO2, PR, PI
		        			if (lowPerf.curNum==LowPerf.SPO2CALCBUFFLEN) {
		        				int[] nRetVal=new int[3];
		        				DllInterface.INSTANCE.SpO2Calculate4WW(lowPerf.m_dRed_AC,lowPerf.m_dIrf_AC,512,nRetVal,spO2Algorithm_C);
		        				int spo2=nRetVal[0];
		        				int bpm=nRetVal[1];
		        				float pi=nRetVal[2]/10f;
//		        				bpm=DllInterface.INSTANCE.SSF_Calc_B(lowPerf.m_dIrf_AC, SSF_Calc);
//		        				medfilterBO4Irfrared.curIndex4dIred=512;//lowPerf.m_dIrf_AC+512 为了实现移位效果 定义了一个变量来记录index值
		        				//spo2=medfilterBO4Irfrared.MAReject(spo2, lowPerf.m_dIrf_AC, medfilterBO4Irfrared.nChangeBPMFalg);
		        				lowPerf.resetDataArr();
		        				//更新缓存中各指标的值以及数据库每五分钟存一次
		        				if (oximetry!=null) {
		        					oximetry.setSpo2(spo2);
		        					oximetry.setPI((int)pi);
		        					oximetry.setPulserate(bpm);
//		        					logger.debug("pulse:"+bpm+"spo2"+spo2+"pi"+pi);
								}
		        				if (bed!=null) {
		        					bed.setOxiUpdateMs(System.currentTimeMillis());
		        				}
		        				
		        				//将算法解析完数据存入redis -fy
		        				RedisManager.set("Spo2", String.valueOf(spo2));
		        				RedisManager.set("pi", String.valueOf(pi));
		        				RedisManager.set("bpm", String.valueOf(bpm));
		        				
							}
		        			//重采样相关 Resampling
		        			if (channels!=null&&channels.size()>0) {//channel!=null&&channel.isOpen()
//		        				double dResampleRet[]=new double[8];
//		        				int nResamplingRet=mathTools.Resampling((float)1.0/OximetryAnalysis.SpO2InitialSample, (float)1.0/(oximetryAnalysis.npTargetSampleRate), dResampleRet, value, 0);//0-->BodystmProduct.BodystmSpO2.ordinal()
		        				int nResamplingRet=DllInterface.INSTANCE.Resampling_B((float)1.0/OximetryAnalysis.SpO2InitialSample, (float)1.0/(oximetryAnalysis.npTargetSampleRate), value, 0, mathTools);
		        				for (int j = 0; j <= nResamplingRet; j++)
	        					{
	        						//channel.writeAndFlush(new TextWebSocketFrame("/"+strMac+"/Irfrared/"+tmp/3000));
//	        						sbResultData.append(oximetryAnalysis.maxNum+oximetryAnalysis.minNum-tmp);sbResultData.append(",");
		        					sbResultData.append(tmp);sbResultData.append(",");//canvas
	        					}
		        			}
		        		}
				        /*if (channel!=null&&channel.isOpen()) {
				        	sbResultData.deleteCharAt(sbResultData.length()-1);
				        	sbResultData.append("/");
				        	sbResultData.append(oximetryAnalysis.maxNum);
				        	sbResultData.append("/");
				        	sbResultData.append(oximetryAnalysis.minNum);
				        	channel.writeAndFlush(new TextWebSocketFrame(sbResultData.toString()));
				        }*/
				        if (channels!=null&&channels.size()>0) {
				        	sbResultData.deleteCharAt(sbResultData.length()-1);
				        	sbResultData.append("/");
				        	sbResultData.append(oximetryAnalysis.maxNum);
				        	sbResultData.append("/");
				        	sbResultData.append(oximetryAnalysis.minNum);
				        	String msg = sbResultData.toString();
							for(Channel ch:channels){
								if (ch!=null&&ch.isOpen()) {
									ch.writeAndFlush(new TextWebSocketFrame(msg));
							    }
							}
						}
				        
				    } 
		        	//channel.flush();
//				}
			     
			}
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			// Thread.sleep()方法由于中断抛出异常。  
            // Java虚拟机会先将该线程的中断标识位清除，然后抛出InterruptedException，  
            // 因为在发生InterruptedException异常的时候，会清除中断标记  
            // 如果不加处理，那么下一次循环开始的时候，就无法捕获这个异常。  
            // 故在异常处理中，再次设置中断标记位  
            Thread.currentThread().interrupt();
		} finally {
			//jedis.close();  
			RedisManager.close(jedis);
//			this.stop();
			DllInterface.INSTANCE.MedFilterBO_D(medfilterBO4Irfrared);
			DllInterface.INSTANCE.MedFilterBO_D(medfilterBO4Red);//中值滤波
			DllInterface.INSTANCE.IIFilter_D(iirFilter4Irfrared);//低通滤波
			DllInterface.INSTANCE.IIFilter_D(iirFilter4Red);//低通滤波
			DllInterface.INSTANCE.butter_D(lowPerf_dll);//低通滤波
			DllInterface.INSTANCE.SSF_Calc_D(SSF_Calc);
			DllInterface.INSTANCE.Resampling_D(mathTools);
			DllInterface.INSTANCE.SpO2Algorithm_D(spO2Algorithm_C);
		}
	}
}
